package com.danan.data_loader.initializer;

import com.danan.data_loader.config.KafkaConfig;
import com.danan.data_loader.config.SourceConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * Created with IntelliJ IDEA.
 *
 * @Author: NanHuang
 * @Date: 2023/05/13/11:51
 * @Description:
 */
@Configuration
public class DataStreamInitializer {

    /**
     * @Description 获取数据流
     * @Param [env]
     **/
    @Bean
    public DataStreamSource<String> dataStreamSource(@Autowired StreamExecutionEnvironment env,
                                                     @Autowired KafkaConfig kafkaConfig,
                                                     @Autowired SourceConfig sourceConfig){
        KafkaSource<String> kafkaSource = null;
        switch (kafkaConfig.getConsumptionModel().toLowerCase()){
            case "earliest":
                kafkaSource = KafkaSource.<String>builder()
                        .setBootstrapServers(kafkaConfig.getBootstrapServer())
                        .setTopics(String.format("ods_%s_%s",sourceConfig.getDatabase(),sourceConfig.getSchemaList()[0]))
                        .setGroupId(kafkaConfig.getConsumerGroupId())
                        .setStartingOffsets(OffsetsInitializer.earliest())
                        .setValueOnlyDeserializer(new SimpleStringSchema())
                        .build();
                break;
            case "latest":
                kafkaSource = KafkaSource.<String>builder()
                        .setBootstrapServers(kafkaConfig.getBootstrapServer())
                        .setTopics(String.format("ods_%s_%s",sourceConfig.getDatabase(),sourceConfig.getSchemaList()[0]))
                        .setGroupId(kafkaConfig.getConsumerGroupId())
                        .setStartingOffsets(OffsetsInitializer.latest())
                        .setValueOnlyDeserializer(new SimpleStringSchema())
                        .build();
                break;
            default:
                throw new RuntimeException("The consumption model is not defined. Please select a consumption model from the list [ earliest , latest ]");
        }
        return env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(),"kafka-source");
    }

}
